Recommender Systems in Spark

Recommender Systems are a set of methods able to predict the 'rating' or 'preference' that a user would give to an item. Among the different approaches to design this kind of systems, in this lab session we are going to work with Collaborative Filtering (CF) approaches. However, unlike previous lab session, we are going to work with distributed implementations based on Spark.

Along the notebook we are going to use the dataset from MovieLens. MovieLens data sets were collected by the GroupLens Research Project at the University of Minnesota. The original version of this problem contains 10 millions of ratings applied to 10681 movies by 71567 users. However, for this lab, we will use a reduced version consisting of 100,000 ratings (with values from 1 to 5) from 943 users on 1682 movies, where each user has rated, at least, 20 movies.

As you progress in this notebook, you will have to complete some exercises. Each exercise includes an explanation of what is expected, followed by code cells where one or several lines will contain <FILL IN>. The cell that needs to be modified will have # TODO: Replace <FILL IN> with appropriate code on its first line. Once the <FILL IN> sections are updated, the code can be run; below this cell, you will find the test cell (beginning with the line # TEST CELL) and you can run it to verify the correctness of your solution.

Read data and preprocessing


In [ ]:
# Import some libraries
import numpy as np
import math
from test_helper import Test

In [ ]:
# Define data file
ratingsFilename = 'u.data'
# Read data with spark 
rawRatings = sc.textFile(ratingsFilename)

# Check file format
print rawRatings.take(10)

Formatting the data

As you can checked, each line is formatted as: UserID \t MovieID \t Rating \t Timestamp \n. So, let's convert each line to a list with the fields [UserID, MovieID, Rating] (we drop the timestamp because we do not need it for this exercise).

In order to work in a distributed way, let's start implementing a function (format_rating) that let's convert each line into the desired format. Then, we can call this function from our RDD with a map method to apply it over each line.

Tip: Check the Python function split() to convert each line into a list of items split by a given character.

1. Create function format_ratings( )


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
def format_ratings(line):
    """ Parse a line in the ratings dataset
    Args:
        line (str): a line in the ratings dataset in the form of UserID \t MovieID \t Rating \t Timestamp \n
    Returns:
        tuple: (UserID, MovieID, Rating)
    """
    # Divide each line with the character '\t'
    items = # FILL
    # Get UserID and convert it to int
    user_id = # FILL
    # Get ItemID and convert it to int
    item_id = # FILL
    # Get Reating and convert it to float
    rating_id = # FILL
    # Return UserID, ItemID and Rating. 
    return # FILL

In [ ]:
###########################################################
# TEST CELL
###########################################################
check_line = u'196\t242\t3\t881250949'
check_tuple = format_ratings(check_line)
Test.assertEquals(check_tuple, (196, 242, 3), 'incorrect result: data are incorrectly formatted')

2. Format your data

Convert RDD rawRatings into a new RRD where each line has been transformed with the function format_ratings().


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# Convert each line of rawRatings
ratingsRDD = # FILL IN
# Show the output
print ratingsRDD.take(10)

In [ ]:
###########################################################
# TEST CELL
###########################################################

Test.assertEquals(ratingsRDD.first(), (196, 242, 3), 'incorrect result: data are incorrectly formatted')

Creating training and test rating matrices

Now, to be able to train and evaluate the different methods, let's divide the rating matrix into two different matrix:

  • one of them with the 75% of the ratings for training the different recommenders;
  • other one, with the remaining 25%, for testing purposes.

Hint: you can apply the randomSplit() method of the RDD to divide it at random.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
trainingRDD, testRDD = ratingsRDD.randomSplit(#FILL IN, seed=0L)

print 'Training: %s,  test: %s\n' % (trainingRDD.count(), testRDD.count())

In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(trainingRDD.count(), 75008, 'incorrect result: number of training ratings is incorrect')
Test.assertEquals(testRDD.count(), 24992, 'incorrect result: number of test ratings is incorrect')
Test.assertEquals(trainingRDD.first(), (186, 302, 3.0), 'incorrect result: the values of the training RDD are incorrect')
Test.assertEquals(testRDD.first(), (196, 242, 3.0), 'incorrect result: the values of the testing RDD are incorrect')

Baseline recommender

In this section we are going to build a mean based baseline; that is, the recommender will predict new ratings as the average value of the ratings given by this user to previous rated items.

To design this approach, let's start building a function that, given a user_id and all its associated ratings, is able to compute the average value of all the ratings.

1. Build function getAverages( )


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

def getAverages(IDandRatingsTuple):
    """ Calculate average rating
    Args:
        IDandRatingsTuple: a single tuple of (ID_user, (Rating1, Rating2, Rating3, ...))
    Returns:
        tuple: a tuple of (ID_user, averageRating)
    """
    id_user = # FILL IN
    mean_value = # FILL IN
    return (id_user, mean_value)

In [ ]:
###########################################################
# TEST CELL
###########################################################
check_ratings = (0, iter(2, 5, 3, 1, 2)) 
check_output = getAverages(check_ratings)
Test.assertEquals(check_output, (0, 2.6), 'incorrect result: check_output is incorrect')

2. Compute the average rating of each user

For the next step, let's use the getAverages( ) function to compute the average rating of all the users in a distributed way.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
# From ratingsRDD with tuples of (UserID, MovieID, Rating) create an RDD with tuples of
# the (UserID, Rating), i.e, remove the MovieID field.
RDD_users_ratings = trainingRDD.# FILL IN 

# From the RDD of (UserID, Rating) create an RDD with tuples of 
# (UserID, iterable of Ratings for that UserID), where iterable of Ratings for that UserID has
# all the rated items of UserID. Review groupByKey() method of RDD elements.
RDD_users_allratings = RDD_users_ratings.# FILL IN 

# Using getAverages(), compute the average rating of each user.
RDD_users_mean  = RDD_users_allratings.# FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(RDD_users_ratings.first(), (186, 3.0), 'incorrect result: RDD_users_ratings is incorrect')
Test.assertEquals(list(RDD_users_allratings.first()[1])[:5], [4.0, 5.0, 4.0, 3.0, 3.0], 'incorrect result: RDD_users_allratings is incorrect')
Test.assertEquals(np.round(RDD_users_mean.first()[1],2), 3.69, 'incorrect result: RDD_users_mean is incorrect')

3. Make new predictions

Now, let's make predictions for our test data. So, for each pair (user, item) of testRDD, we will have to compute the predicted rating, which will be given by the average rating of the corresponding user.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# Create a new RDD, RDD_test_ids, consisting of (UserID, MovieID) pairs 
# that you extract from testRDD. That is, remove the filed Rating from testRDD.
RDD_test_ids = testRDD.# FILL IN

# Using the user_id as key, join RDD_test_ids with RDD_users_mean. 
# Review the method leftOuterJoin() of RRD elements. 
RDD_test_ids_mean = RDD_test_ids.# FILL IN

# Note that the resulting RRD provided by leftOuterJoin() method has the format 
# (Iduser, (IdItem, PredRating)). Remap it to create a RDD with tuples (Iduser, IdItem, PredRating)
RDD_pred_mean = RDD_test_ids_mean.# FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################
Test.assertEquals(RDD_test_ids.first(), (196, 242), 'incorrect result: RDD_test_ids is incorrect')
Test.assertEquals(RDD_test_ids_mean.first(), (512, (23, 4.294117647058823)), 'incorrect result: RDD_test_ids_mean is incorrect')
Test.assertEquals(RDD_pred_mean.first(), (512, 23, 4.294117647058823), 'incorrect result: RDD_pred_mean is incorrect')

4. Performance evaluation

Finally, let's evaluate the goodness of the computed predictions over the test data. To evaluate it, we are going to use two measurements:

  • The Mean Average Error: $$MAE = \frac{1}{N} \sum_{i=1}^N |p_i -r_i|$$
  • The Root Mean Square Error: $$ RMSE = \sqrt{\frac{1}{N} \sum_{i=1}^N (p_i -r_i)^2}$$

The next cell contains a function that given two RDDs, the first with the predicted ratings and the second with the real rating values, is able to compute the RMSE value. Use it as example to create a new function able to calculate the MAE value.


In [ ]:
def get_RMSE(predictedRDD, actualRDD):
    """ Compute the root mean squared error between two RDD with the predicted and actual ratings
    Args:
        predictedRDD: predicted ratings for each movie and each user where each entry is in the form
                      (UserID, MovieID, Rating)
        actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
    Returns:
        RSME (float): computed RSME value
    """
    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = predictedRDD.map(lambda x: ((x[0],x[1]),x[2]))

    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = actualRDD.map(lambda x: ((x[0],x[1]),x[2]))

    # Compute the squared error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    squaredErrorsRDD = (predictedReformattedRDD.join(actualReformattedRDD).map(lambda x: pow(x[1][0]-x[1][1],2)))

    # Compute the total squared error - do not use collect()
    totalError = squaredErrorsRDD.reduce(lambda a,b: a+b)

    # Count the number of entries for which you computed the total squared error
    numRatings = squaredErrorsRDD.count()

    # Using the total squared error and the number of entries, compute the RSME
    return math.sqrt(float(totalError)/numRatings )

In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# Create a function to compute the MAE error
def get_MAE(predictedRDD, actualRDD):
    """ Compute the mean absolute error between predicted and actual
    Args:
        predictedRDD: predicted ratings for each movie and each user where each entry is in the form
                      (UserID, MovieID, Rating)
        actualRDD: actual ratings where each entry is in the form (UserID, MovieID, Rating)
    Returns:
        MAE (float): computed MAE value
    """
    # Transform predictedRDD into the tuples of the form ((UserID, MovieID), Rating)
    predictedReformattedRDD = # FILL IN 

    # Transform actualRDD into the tuples of the form ((UserID, MovieID), Rating)
    actualReformattedRDD = # FILL IN 

    # Compute the mean absolute error for each matching entry (i.e., the same (User ID, Movie ID) in each
    # RDD) in the reformatted RDDs using RDD transformtions - do not use collect()
    AbsoluteErrorsRDD = # FILL IN 
    
    # Compute the total absolute error - do not use collect()
    totalError = # FILL IN 

    # Count the number of entries for which you computed the total absolute error
    numRatings = # FILL IN 

    # Using the total squared error and the number of entries, compute the MAE
    return # FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################
check_Predicted = sc.parallelize([(0, 0, 5), (0, 1, 3)])
check_Actual = sc.parallelize([(0, 0, 3), (0, 1, 2)])

Test.assertEquals(get_MAE(check_Predicted, check_Actual), 1.5, 'incorrect result: function get_MAE() is incorrect')

Now, let's evaluate the performance of the mean based baseline.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# Compute the MAE error for each mean based baseline
MAE_mean = # FILL IN
# Compute the RMSE error for each mean based baseline
RMSE_mean = # FILL IN

print 'Mean model ... MAE: %2.2f , RMSE: %2.2f ' % (MAE_mean, RMSE_mean)

In [ ]:
###########################################################
# TEST CELL
###########################################################

Test.assertEquals(np.round(MAE_mean,2), 0.83, 'incorrect result: MAE value of mean recommeder is incorrect')
Test.assertEquals(np.round(RMSE_mean,2), 1.04, 'incorrect result: RMSE value of mean recommeder is incorrect')

Alternative Least Squares algorithm

Now, let's work with the ALS algorithm. As you know, this method tries to approximate the ratings matrix by factorizing it as the product of two matrices:

$$ R = X * Y $$

where $X$ describes properties of each user, and $Y$ describes properties of each item. These two matrices are known as latent factors, since they are a low-dimension representation of users and items.

If we examine the utilities of the MLLib, we can find a implementation of the ALS algorithm. So, in this section, we will learn to use this module of the MLlib.

Training a ALS model

This library includes the method ALS.train( ) which directly allows us training an ALS model. Use this function to train a recommender system.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

from pyspark.mllib.recommendation import ALS

# Define parameters
n_latent_factors = 5
numIterations = 15

# Train the model (set seed=0L)
sc.setCheckpointDir('checkpoint/')
model = # FILL IN  , seed=0L)

Computing predictions

Once the model has been trained, let's make the recommendations. For this purpose, the ALS model has a method model.predictAll(testdata) which estimates the ratings over an RDD of ID pairs (userID, itemID).

So, complete the next cell to estimate the rating over the pairs of (users, items) of our test data.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# Create a new RDD, RDD_test_ids, consisting of (UserID, MovieID) pairs 
# that you extract from testRDD. That is, remove the filed Rating from testRDD.
RDD_test_ids = # FILL IN

# Estimate their ratings with model.predictAll( )
predictions = # FILL IN

# Print the first 10 predictions
predictions.take(10)

In [ ]:
###########################################################
# TEST CELL
###########################################################
check_predictions = predictions.filter(lambda x: (x[0]==621) & (x[1]==68)).first()
Test.assertEquals(np.round(check_predictions[2],1), 3.7, 'incorrect result: predicted value is incorrect')
check_predictions = predictions.filter(lambda x: (x[0]==880) & (x[1]==8)).first()
Test.assertEquals(np.round(check_predictions[2],1), 4, 'incorrect result: predicted value is incorrect')

Note that, although each element of the RDD predictions is an object, you can extract the UserID, ItemID and predicted rating accessing to its first, second, and third element, respectively. See the example of the next cell...


In [ ]:
x = predictions.first()
print 'User ID: ' + str(x[0])
print 'Item ID: ' + str(x[1])
print 'Predicted rating: ' + str(x[2])

Advance work

Which ones are the 5 top ranked items for the user with id= 10?


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

user_id = 10

# Select the outputs of the user_id=10 (hint: filter method)
predictions_userid = # FILL IN
# Sort the outputs according to rating field (hint: sortBy method)
predictions_userid_sorted = # FILL IN

predictions_userid_sorted.take(5)

In [ ]:
###########################################################
# TEST CELL
###########################################################

check_output = predictions_userid_sorted.map(lambda x:x[1]).take(5)

Test.assertEquals(check_output, [483, 127, 174, 701, 185], 'incorrect result: recommeded items are incorrect')

Performance evaluation

Finally, let's evaluate the model performance over the test data using the get_MAE( ) and get_RMSE( ) functions.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# Compute the MAE error 
MAE_als = # FILL IN
# Compute the RMSE error 
RMSE_als = # FILL IN

print 'ALS model ... MAE: %2.2f , RMSE: %2.2f ' % (MAE_als, RMSE_als)

In [ ]:
###########################################################
# TEST CELL
###########################################################

Test.assertEquals(np.round(MAE_als,2), 0.77, 'incorrect result: MAE value of ALS recommeder is incorrect')
Test.assertEquals(np.round(RMSE_als,2), 1.01, 'incorrect result: RMSE value of ALS recommeder is incorrect')

Advance work: User based recommendations

In this last section, we are going to implement a user-based collaborative filtering system on Spark.

As you know, the general algorithm has two steps:

  1. Computing the similarity of each user to the remaining ones and select those with similarity larger than zero or than a given threshold.
  2. Estimating the ratings of an items for a given user. In this case, we will have to average the ratings of the neighbors of this user.

To make easier this implementation, let's start precomputing all the similarities and we will use them in the second step.

Step 1. Training the system: Finding similar users

In the next cell, you are given a function to compute the Pearson correlation coefficient defined as: $$ sim(user_a, user_b) = \frac{\sum_{p \in P} (r_{a,p} -\bar{r}_a)(r_{b,p} -\bar{r}_b)} {\sqrt{ \sum_{p \in P} (r_{a,p} -\bar{r}_a)^2} \sqrt{ \sum_{p \in P} (r_{b,p} -\bar{r}_b)^2}}$$

where $P$ is set of items rated for both users a and b, $r_{u,p}$ is the rating of the user u to item p, and $\bar{r}_u$ is the mean value of the all the ratings of the user u.


In [ ]:
def compute_Pearson_correlation(ratings_u1, ratings_u2, n_items_th = 1):
    """ Calculate correlation coefficient
    Args:
       ratings_u1: Iduser, a pyspark iterable with tuples (item, rating) with all the ratings of user 1
       ratings_u2: Iduser, a pyspark iterable with tuples (item, rating) with all the ratings of user 2
       n_items_th: number of common items that both users have to be rated to compute its similarity. 
       If the users have less than n_items_th common rated items, its similarity is set to zero.
       By default, n_items_th is set to 1.
    Returns:
        corr_value: correlation coefficient
    """
    # Get the items and values rated by user 1 
    [items_u1, values_u1] = zip(*list(ratings_u1[1]))
    # Get the items and values rated by user 2
    [items_u2, values_u2] = zip(*list(ratings_u2[1]))
    
    # Get the set of items rated by both users and their values
    r_u1 = [values_u1[i] for i, item in enumerate(items_u1) if item in items_u2]
    r_u2 = [values_u2[i] for i, item in enumerate(items_u2) if item in items_u1]
    
    if len(r_u1)>= n_items_th: # If the are common rated items...
        # Compute the means of the user ratings
        m_1 = np.mean(np.array(values_u1))
        m_2 = np.mean(np.array(values_u2))
        # Remove their means        
        r_u1 = r_u1 - m_1 
        r_u2 = r_u2 - m_2 
        # Compute the correlation coefficient
        corr_value = np.dot(r_u1,r_u2.T)/(np.sqrt(np.dot(r_u1,r_u1.T))*np.sqrt(np.dot(r_u2,r_u2.T)))
        # Remove useless dimensions
        corr_value =np.squeeze(corr_value)
        
    else: # Else correlation is 0
        corr_value = 0
        
    # Checking that the correlation is not NaN (this would happen if the denominatior is 0), 
    # in this case, set the correlation coefficient to 0 
    if math.isnan(corr_value): 
        corr_value = 0
        
    return corr_value

Now, complete the next cell to be able to evaluate the function compute_Pearson_correlation( ).


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# 1. From trainingRDD, create an RDD where each element is (userID, (ItemID, rating)), i.e,
# the userID is the key and the pair (ItemID, rating) is the value.
RDD_users_as_key = #FILL IN

# 2. Group the elements of RDD_users_as_key by key (see groupByKey() method)
# Each element of this new RDD is (userID, spark-iterable), where the spark iterable has 
# a list with all the rated items elements (ItemID, rating)
RDD_users_ratings = #FILL IN

# 3. Extract the spark-iterable element with all the ratings of users 1 and 2
id_u1 = 1
ratings_u1 =  #FILL IN
id_u2 = 2
ratings_u2 =  #FILL IN

# 4. Compute its similarity
n_items_th = 4
compute_Pearson_correlation(ratings_u1, ratings_u2,  n_items_th)

In [ ]:
###########################################################
# TEST CELL
###########################################################

Test.assertEquals(np.round(similarity,2), 0.80, 'incorrect result: similarity value is incorrect')

Once we can compute similarities between two users, let's compute, for each user, its distance with all the remaining users. The output of this cell will be an RDD of similarities where each element is (UserID, spark-iterable), where spark-iterable is a iterable list with pairs of (UserID, similarity).

Note that it is enough if this list only saves the users with a similarity larger than zero or larger than a given threshold.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################
    
# 1. From trainingRDD, create an new RDD with elements (userID, spark-iterable), where
# spark iterable has a list [(ItemID, rating), ...] with all the items rated by UserID
# (see previous section)
RDD_users_ratings  = # FILL IN

# 2. Create all the combinations of pairs with the users (see cartesian method of RDD elements)
# Note that cartesian returns an RDD with elements ((id_1, iterable_ratings 1), (id_2, iterable_ratings 2))
pairs_users = # FILL IN

# 3. Compute correlation values with the function compute_Pearson_correlation() 
n_items_th = 4
correlation_values = # FILL IN

# 4. Select correlation values larger than the similarity threshold (filter method)
sim_th = 0.2
correlation_values_sel = # FILL IN

# 5. Let's reorganize each element of the RDD to get user 1 as key and the tuple 
# (user2, similarity) as value
all_correlations_with_userid = # FILL IN

# 6. Group the elements of all_correlations_with_userid by key (groupByKey() method)
# Each element of this new RDD is (userID, spark-iterable), where the spark iterable has 
# a list with all the similar users (UserID, similarity)
RDD_sim_users = # FILL IN
RDD_sim_users.cache()

In [ ]:
###########################################################
# TEST CELL
###########################################################

id_user = 1
sim_user1 = RDD_sim_users.filter(lambda x : x[0]==id_user).first()
sim_check = sc.parallelize(list(sim_user1[1]))

Test.assertEquals(np.round(sim_check.filter(lambda x: x[0] == 22).first()[1],2), 0.34, 'incorrect result: similarity value is incorrect')
Test.assertEquals(np.round(sim_check.filter(lambda x: x[0] == 120).first()[1],2), 0.37, 'incorrect result: similarity value is incorrect')

Step 2. Making predictions

Once you know how a user is similar to other users, you would like to know which items should be recommended for this user.

For this purpose, we have to assign a rating to each item by averaging the ratings that the similar users have given to that item according to this expression: $$ pred(user_a, item_i) = \bar{r_a} + \frac{\sum_{b \in N} sim(user_a, user_b) * (r_{b,i}- \bar{r_b})}{\sum_{b \in N} sim(user_a, user_b)}$$ where N is the number of neighbors of user a ($sim >sim_th$) which have rated item i.

Next cell contains the necessary code to compute the above expression given the average rating of a user. Review this function, paying special attention to the input parameters.


In [ ]:
def compute_predictions(med_user, list_sim, list_ratings):
    
    """ Estimate the rating that a user u would assign over a item i
    Args:
        med_user: average rating of the user u
        list_sim: list of tuples (id_user, similarity) with the users who are 
        similar to the user u and its similarity value
        list_rantings: list of tuples (id_user, rating) with the ratings that the remaining
        users have already assigned to the item i. Note that the rating values are normalized
        (the average rating of the corresponding user has been previously subtracted so that 
        this function implements the above expression)
    Returns:
        pred_value: estimated rating for the user u to the item i
    """
    if (list_sim is not None) & (list_ratings is not None):
        dict1 = dict(list_sim)
        dict2 = dict(list_ratings)
        list_intersect = [(k, dict1[k], dict2[k]) for k in sorted(dict1) if k in dict2]
        # We have build a list with: (user_id_similar, sim_value, rating_user_sim)
        if len(list_intersect)>0:
            aux = [(sim*rat, sim) for (id_user, sim, rat) in list_intersect]
            numerator, denominator = zip(*aux)
            pred_value = med_user + sum(numerator)/sum(denominator)
        else:
            pred_value = med_user
    else:
         pred_value = med_user
    return pred_value

To obtain the predicted outputs for the test data and evaluate the performance of the user based recommender, we need to compute all the needed input arguments. Follow the steps of the next sections to obtain them.

1. Computing the average rating of each user

Please, review section baseline recommender (Subsection 2).


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# 1. From trainingRDD create a new RDD with the fields (user, rating), and convert it to
# (user, list_ratings). Hint: GroupByKey()
RDD_users_ratings = # FILL IN
# Convert this RDD (user, list_ratings) -> (user, mean_user). Use getAverages() function
RDD_users_mean  = # FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################

id_user = 1
mean_user1 = RDD_users_mean.filter(lambda x : x[0]==id_user).first()

Test.assertEquals(np.round(mean_user1[1],2), 3.6, 'incorrect result: mean rating value is incorrect')

2. Create a list of ratings

Here, you should create a new RDD with one element for each item, where each element is given by (item_id, list_ratings), where list of ratings has a set of tuples (user_id, rating) with the id of the user who has rated item_id and the assigned rating. Besides, the ratings of the list have to be normalized (subtracting its corresponding user average rating).


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# 2.1. Create an RDD with training ratings subtracting the users' mean
# Create an RDD with elements (user, (item, rating)) 
trainingRDD_aux = # FILL IN

# Combine it with the users_mean -> (user, ((item, rating), user_mean))
# Hint: leftOuterJoin()
trainingRDD_mean = # FILL IN
# Create a new RDD subtracting the mean of each rating and reorganize it -> (user, item, rating_norm)
trainingRDD_norm = # FILL IN

# 2.2. Create an RDD with normalized training ratings with the form (item, list((user, rating)))
RDD_tratings_item = # FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################

id_item = 22
ratings_item = RDD_ratings_item.filter(lambda x : x[0]==id_item).first()
ratings_check = sc.parallelize(list(ratings_item[1]))

Test.assertEquals(np.round(ratings_check.filter(lambda x: x[0] == 608).first()[1],2), 0.26, 'incorrect result: rating value is incorrect')
Test.assertEquals(np.round(ratings_check.filter(lambda x: x[0] == 184).first()[1],2), -0.66, 'incorrect result: rating value is incorrect')

3. Combine previous RDDs

Until now, we have these RDDs:

  • RDD_sim_users
  • RDD_users_mean
  • RDD_ratings_item

To make the predictions over the test data we need for each pair (userID, itemID) of the test ratings, we need to build an element containing:

  • the average rating of userID
  • the similar users of userID
  • the list of rating of itemID so that we can call the compute_predictions function with the corresponding input parameters.

Then, here, we are going to combine the above RDD to create a new RDD with elements given by: ((userID, itemID), average_rating_userID, list_similar_users_to_userID, list_ratings_itemID)


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# 3.1 Create an input RDD, testForPredictingRDD, consisting of (UserID, MovieID) pairs 
# that you extract from testRDD (i.e., remove the field rating)
RDD_test_ids = # FILL IN

# 3.2 Combine RDD_test_ids with RDD_users_mean to create an RDD (user, (item, mean_user))
# Hint: leftOuterJoin()
RDD_test_ids_mean = # FILL IN

# 3.3 Combine RDD_test_ids_mean with RDD_sim_users to create an RDD with elements 
# (user, ((item, mean_user), list_sim_user)). Hint: leftOuterJoin()
# Next, reformat it to obtain elements (item, (user, mean_user, list_sim_user))
RDD_test_ids_sim = # FILL IN

# 3.4 Combine RDD_test_ids_sim with RDD_ratings_item to create an RDD with elements
# (item, ((user, mean_user , list_sim_user), list_item_rating)). Hint: leftOuterJoin()
# Next, reformat it to obatian elements ((user, item), mean_user, list_sim_user, list_item_rating)
RDD_test_ids_sim_rat = # FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################

check_out = RDD_test_ids_sim_rat.filter(lambda x: x[0]==(218, 516)).first()
Test.assertEquals(np.round(check_out[1],2), 3.62, 'incorrect result: mean value of the RDD is incorrect')

sim_check = sc.parallelize(list(check_out[2]))
Test.assertEquals(np.round(sim_check.filter(lambda x: x[0] == 24).first()[1],2), 0.31, 'incorrect result: similarity value is incorrect')

rating_check = sc.parallelize(list(check_out[3]))
Test.assertEquals(np.round(rating_check.filter(lambda x: x[0] == 308).first()[1],2), 0.23, 'incorrect result: rating value is incorrect')

4. Compute predictions

Complete the next cell to use RDD_test_ids_sim_rat elements as inputs of the function compute_predictions( ) and obtain the predicted ratings over the test data.


In [ ]:
###########################################################
# TODO: Replace <FILL IN> with appropriate code
###########################################################

# For each element of RDD_test_ids_sim_rat call to compute_predictions and create a new RDD
# with elements ((user, item), predicted value)
RDD_outputs = # FILL IN
RDD_predictions = # FILL IN

In [ ]:
###########################################################
# TEST CELL
###########################################################

Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (840, 516)).first()[2],2), 4.8, 'incorrect result: predicted value is incorrect')
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (174, 1032)).first()[2],2), 3.28, 'incorrect result: predicted value is incorrect')
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (896, 12)).first()[2],2), 3.83, 'incorrect result: predicted value is incorrect')
Test.assertEquals(np.round(RDD_predictions.filter(lambda x: (x[0], x[1]) == (59, 528)).first()[2],2), 4.18, 'incorrect result: predicted value is incorrect')

5. Evaluate performance


In [ ]:
# Compute the error MAE
MAE = get_MAE(RDD_predictions, testRDD)
# Compute the error RMSE
RMSE = get_RMSE(RDD_predictions, testRDD)

print 'User based model ... MAE: %2.2f , RMSE: %2.2f ' % (MAE, RMSE)

In [ ]:
###########################################################
# TEST CELL
###########################################################

Test.assertEquals(np.round(MAE,2), 0.80, 'incorrect result: MAE value is incorrect')
Test.assertEquals(np.round(RMSE,2), 1.02, 'incorrect result: RMSE value is incorrect')